/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package Beam.common;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Datasets;
import com.google.api.services.bigquery.Bigquery.Tables;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.Subscription;
import com.google.api.services.pubsub.model.Topic;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.util.Transport;
import org.joda.time.Duration;

/**
 * The utility class that sets up and tears down external resources,
 * and cancels the streaming pipelines once the program terminates.
 * <p>
 * <p>It is used to run Beam examples.
 */
public class ExampleUtils {

    private static final int SC_NOT_FOUND = 404;

    /**
     * \p{L} denotes the category of Unicode letters,
     * so this pattern will match on everything that is not a letter.
     * <p>
     * <p>It is used for tokenizing strings in the wordcount examples.
     */
    public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";

    private final PipelineOptions options;
    private Bigquery bigQueryClient = null;
    private Pubsub pubsubClient = null;
    private Set<PipelineResult> pipelinesToCancel = Sets.newHashSet();
    private List<String> pendingMessages = Lists.newArrayList();

    /**
     * Do resources and runner options setup.
     */
    public ExampleUtils(PipelineOptions options) {
        this.options = options;
    }

    /**
     * Sets up external resources that are required by the example,
     * such as Pub/Sub topics and BigQuery tables.
     *
     * @throws IOException if there is a problem setting up the resources
     */
    public void setup() throws IOException {
        Sleeper sleeper = Sleeper.DEFAULT;
        BackOff backOff =
                FluentBackoff.DEFAULT
                        .withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff();
        Throwable lastException = null;
        try {
            do {
                try {
                    setupPubsub();
                    setupBigQueryTable();
                    return;
                } catch (GoogleJsonResponseException e) {
                    lastException = e;
                }
            } while (BackOffUtils.next(sleeper, backOff));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            // Ignore InterruptedException
        }
        throw new RuntimeException(lastException);
    }

    /**
     * Sets up the Google Cloud Pub/Sub topic.
     * <p>
     * <p>If the topic doesn't exist, a new topic with the given name will be created.
     *
     * @throws IOException if there is a problem setting up the Pub/Sub topic
     */
    public void setupPubsub() throws IOException {
        ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
                options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
        if (!pubsubOptions.getPubsubTopic().isEmpty()) {
            pendingMessages.add("**********************Set Up Pubsub************************");
            setupPubsubTopic(pubsubOptions.getPubsubTopic());
            pendingMessages.add("The Pub/Sub topic has been set up for this example: "
                    + pubsubOptions.getPubsubTopic());

            if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
                setupPubsubSubscription(
                        pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription());
                pendingMessages.add("The Pub/Sub subscription has been set up for this example: "
                        + pubsubOptions.getPubsubSubscription());
            }
        }
    }

    /**
     * Sets up the BigQuery table with the given schema.
     * <p>
     * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
     * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
     * will be created.
     *
     * @throws IOException if there is a problem setting up the BigQuery table
     */
    public void setupBigQueryTable() throws IOException {
        ExampleBigQueryTableOptions bigQueryTableOptions =
                options.as(ExampleBigQueryTableOptions.class);
        if (bigQueryTableOptions.getBigQueryDataset() != null
                && bigQueryTableOptions.getBigQueryTable() != null
                && bigQueryTableOptions.getBigQuerySchema() != null) {
            pendingMessages.add("******************Set Up Big Query Table*******************");
            setupBigQueryTable(bigQueryTableOptions.getProject(),
                    bigQueryTableOptions.getBigQueryDataset(),
                    bigQueryTableOptions.getBigQueryTable(),
                    bigQueryTableOptions.getBigQuerySchema());
            pendingMessages.add("The BigQuery table has been set up for this example: "
                    + bigQueryTableOptions.getProject()
                    + ":" + bigQueryTableOptions.getBigQueryDataset()
                    + "." + bigQueryTableOptions.getBigQueryTable());
        }
    }

    /**
     * Tears down external resources that can be deleted upon the example's completion.
     */
    private void tearDown() {
        pendingMessages.add("*************************Tear Down*************************");
        ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
                options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
        if (!pubsubOptions.getPubsubTopic().isEmpty()) {
            try {
                deletePubsubTopic(pubsubOptions.getPubsubTopic());
                pendingMessages.add("The Pub/Sub topic has been deleted: "
                        + pubsubOptions.getPubsubTopic());
            } catch (IOException e) {
                pendingMessages.add("Failed to delete the Pub/Sub topic : "
                        + pubsubOptions.getPubsubTopic());
            }
            if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
                try {
                    deletePubsubSubscription(pubsubOptions.getPubsubSubscription());
                    pendingMessages.add("The Pub/Sub subscription has been deleted: "
                            + pubsubOptions.getPubsubSubscription());
                } catch (IOException e) {
                    pendingMessages.add("Failed to delete the Pub/Sub subscription : "
                            + pubsubOptions.getPubsubSubscription());
                }
            }
        }

        ExampleBigQueryTableOptions bigQueryTableOptions =
                options.as(ExampleBigQueryTableOptions.class);
        if (bigQueryTableOptions.getBigQueryDataset() != null
                && bigQueryTableOptions.getBigQueryTable() != null
                && bigQueryTableOptions.getBigQuerySchema() != null) {
            pendingMessages.add("The BigQuery table might contain the example's output, "
                    + "and it is not deleted automatically: "
                    + bigQueryTableOptions.getProject()
                    + ":" + bigQueryTableOptions.getBigQueryDataset()
                    + "." + bigQueryTableOptions.getBigQueryTable());
            pendingMessages.add("Please go to the Developers Console to delete it manually."
                    + " Otherwise, you may be charged for its usage.");
        }
    }

    /**
     * Returns a BigQuery client builder using the specified {@link BigQueryOptions}.
     */
    private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
        return new Bigquery.Builder(Transport.getTransport(), Transport.getJsonFactory(),
                chainHttpRequestInitializer(
                        options.getGcpCredential(),
                        // Do not log 404. It clutters the output and is possibly even required by the caller.
                        new RetryHttpRequestInitializer(ImmutableList.of(404))))
                .setApplicationName(options.getAppName())
                .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
    }

    /**
     * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
     */
    private static Pubsub.Builder newPubsubClient(PubsubOptions options) {
        return new Pubsub.Builder(Transport.getTransport(), Transport.getJsonFactory(),
                chainHttpRequestInitializer(
                        options.getGcpCredential(),
                        // Do not log 404. It clutters the output and is possibly even required by the caller.
                        new RetryHttpRequestInitializer(ImmutableList.of(404))))
                .setRootUrl(options.getPubsubRootUrl())
                .setApplicationName(options.getAppName())
                .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
    }

    private static HttpRequestInitializer chainHttpRequestInitializer(
            Credentials credential, HttpRequestInitializer httpRequestInitializer) {
        if (credential == null) {
            return new ChainingHttpRequestInitializer(
                    new NullCredentialInitializer(), httpRequestInitializer);
        } else {
            return new ChainingHttpRequestInitializer(
                    new HttpCredentialsAdapter(credential),
                    httpRequestInitializer);
        }
    }

    private void setupBigQueryTable(String projectId, String datasetId, String tableId,
                                    TableSchema schema) throws IOException {
        if (bigQueryClient == null) {
            bigQueryClient = newBigQueryClient(options.as(BigQueryOptions.class)).build();
        }

        Datasets datasetService = bigQueryClient.datasets();
        if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
            Dataset newDataset = new Dataset().setDatasetReference(
                    new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
            datasetService.insert(projectId, newDataset).execute();
        }

        Tables tableService = bigQueryClient.tables();
        Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
        if (table == null) {
            Table newTable = new Table().setSchema(schema).setTableReference(
                    new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
            tableService.insert(projectId, datasetId, newTable).execute();
        } else if (!table.getSchema().equals(schema)) {
            throw new RuntimeException(
                    "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
                            + ", actual: " + table.getSchema().toPrettyString());
        }
    }

    private void setupPubsubTopic(String topic) throws IOException {
        if (pubsubClient == null) {
            pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
        }
        if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
            pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
        }
    }

    private void setupPubsubSubscription(String topic, String subscription) throws IOException {
        if (pubsubClient == null) {
            pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
        }
        if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
            Subscription subInfo = new Subscription()
                    .setAckDeadlineSeconds(60)
                    .setTopic(topic);
            pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
        }
    }

    /**
     * Deletes the Google Cloud Pub/Sub topic.
     *
     * @throws IOException if there is a problem deleting the Pub/Sub topic
     */
    private void deletePubsubTopic(String topic) throws IOException {
        if (pubsubClient == null) {
            pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
        }
        if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
            pubsubClient.projects().topics().delete(topic).execute();
        }
    }

    /**
     * Deletes the Google Cloud Pub/Sub subscription.
     *
     * @throws IOException if there is a problem deleting the Pub/Sub subscription
     */
    private void deletePubsubSubscription(String subscription) throws IOException {
        if (pubsubClient == null) {
            pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
        }
        if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
            pubsubClient.projects().subscriptions().delete(subscription).execute();
        }
    }

    /**
     * Waits for the pipeline to finish and cancels it before the program exists.
     */
    public void waitToFinish(PipelineResult result) {
        pipelinesToCancel.add(result);
        if (!options.as(ExampleOptions.class).getKeepJobsRunning()) {
            addShutdownHook(pipelinesToCancel);
        }
        try {
            result.waitUntilFinish();
        } catch (UnsupportedOperationException e) {
            // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
            // such as EvaluationResults returned by DirectRunner.
            tearDown();
            printPendingMessages();
        } catch (Exception e) {
            throw new RuntimeException("Failed to wait the pipeline until finish: " + result);
        }
    }

    private void addShutdownHook(final Collection<PipelineResult> pipelineResults) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                tearDown();
                printPendingMessages();
                for (PipelineResult pipelineResult : pipelineResults) {
                    try {
                        pipelineResult.cancel();
                    } catch (IOException e) {
                        System.out.println("Failed to cancel the job.");
                        System.out.println(e.getMessage());
                    }
                }

                for (PipelineResult pipelineResult : pipelineResults) {
                    boolean cancellationVerified = false;
                    for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
                        if (pipelineResult.getState().isTerminal()) {
                            cancellationVerified = true;
                            break;
                        } else {
                            System.out.println(
                                    "The example pipeline is still running. Verifying the cancellation.");
                        }
                        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
                    }
                    if (!cancellationVerified) {
                        System.out.println("Failed to verify the cancellation for job: " + pipelineResult);
                    }
                }
            }
        });
    }

    private void printPendingMessages() {
        System.out.println();
        System.out.println("***********************************************************");
        System.out.println("***********************************************************");
        for (String message : pendingMessages) {
            System.out.println(message);
        }
        System.out.println("***********************************************************");
        System.out.println("***********************************************************");
    }

    private static <T> T executeNullIfNotFound(
            AbstractGoogleClientRequest<T> request) throws IOException {
        try {
            return request.execute();
        } catch (GoogleJsonResponseException e) {
            if (e.getStatusCode() == SC_NOT_FOUND) {
                return null;
            } else {
                throw e;
            }
        }
    }
}
